Make ExternalTaskSensor work with Task SDK #48651
Conversation
c2e5875 to
b57ab8c
Compare
|
Oh thanks kaxil , wrote similar way for the |
d272433 to
7566433
Compare
|
@gopidesupavan I had to add |
7566433 to
25d6863
Compare
thanks, will rebase mine, once it merged. |
25d6863 to
2526882
Compare
amoghrajesh
left a comment
There was a problem hiding this comment.
Few comments from initial look
| router = VersionedAPIRouter() | ||
|
|
||
| ti_id_router = VersionedAPIRouter( |
There was a problem hiding this comment.
Can we add a comment here explaining why we need this one? It will be clear to reader too
| @router.get("/count", status_code=status.HTTP_200_OK) | ||
| def get_dr_count( | ||
| dag_id: str, | ||
| session: SessionDep, | ||
| logical_dates: Annotated[list[UtcDateTime] | None, Query()] = None, | ||
| run_ids: Annotated[list[str] | None, Query()] = None, | ||
| states: Annotated[list[str] | None, Query()] = None, | ||
| ) -> int: | ||
| """Get the count of DAG runs matching the given criteria.""" | ||
| query = select(func.count()).select_from(DagRun).where(DagRun.dag_id == dag_id) | ||
|
|
||
| if logical_dates: | ||
| query = query.where(DagRun.logical_date.in_(logical_dates)) | ||
|
|
||
| if run_ids: | ||
| query = query.where(DagRun.run_id.in_(run_ids)) | ||
|
|
||
| if states: | ||
| query = query.where(DagRun.state.in_(states)) | ||
|
|
||
| count = session.scalar(query) | ||
| return count or 0 |
There was a problem hiding this comment.
We will need to add cadwyn migration for the new endpoints: https://docs.cadwyn.dev/concepts/version_changes/
There was a problem hiding this comment.
I don't think so. We only need to add a migration for breaking changes (or changes to existing endpoints) from what I understand.
There was a problem hiding this comment.
Just checked it here: https://docs.cadwyn.dev/concepts/endpoint_migrations/#defining-endpoints-that-didnt-exist-in-old-versions. Seems we will need it. Let me take it up
There was a problem hiding this comment.
I just had a chat with the Cadwyn Author ( @zmievsa ) who recommends to only add it for breaking changes.
Depending on your needs. My general recommendation is to only add migrations for breaking changes
https://docs.cadwyn.dev/how_to/change_endpoints/#add-a-new-endpoint
There was a problem hiding this comment.
I'll make Cadwyn's docs more verbose on when it makes the most sense to add a migration. Concepts section mostly focuses on what's possible with Cadwyn while "how to" focuses on what you should actually do.
Either way 99% of the time it makes sense to add an endpoint to all versions since it's not a breaking change. Your users will thank you later
Update: https://docs.cadwyn.dev/concepts/endpoint_migrations/#defining-endpoints-that-didnt-exist-in-old-versions added a bunch of notes here and there about this.
|
|
||
| @router.only_exists_in_older_versions | ||
| @router.post( | ||
| @router.get("/count", status_code=status.HTTP_200_OK) |
There was a problem hiding this comment.
We will need to add cadwyn migration for the new endpoints: https://docs.cadwyn.dev/concepts/version_changes/
There was a problem hiding this comment.
I don't think so. We only need to add a migration for breaking changes (or changes to existing endpoints) from what I understand.
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
Show resolved
Hide resolved
| assert response.json() == "2024-01-02T00:00:00Z" | ||
|
|
||
|
|
||
| class TestGetCount: |
There was a problem hiding this comment.
| class TestGetCount: | |
| class TestGetTICount: |
providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
Show resolved
Hide resolved
| if AIRFLOW_V_3_0_PLUS: | ||
| dag_bag.bag_dag(dag=dag) | ||
| else: | ||
| dag_bag.bag_dag(dag=dag, root_dag=dag) | ||
|
|
There was a problem hiding this comment.
Lets just define a mini utility function, too many usages here
| resp = self.client.get("task-instances/count", params=params) | ||
| return TICount(count=resp.json()) |
There was a problem hiding this comment.
Safeguard it inside a try/except in case of a 500?
| resp = self.client.get("dag-runs/count", params=params) | ||
| return DRCount(count=resp.json()) |
There was a problem hiding this comment.
Safeguard in a try/except for cases like 500?
closes #47447
closes #47948
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.